/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.query.running;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

/**
 * Class to track heavy queries (long-running or producing a big result-set).
 */
public final class HeavyQueriesTracker {
    /** Check period in ms. */
    private static final long CHECK_PERIOD = 1_000;

    /**
     * Default threshold result's row count, when count of fetched rows is bigger than the threshold
     * warning will be printed.
     */
    private static final long DFLT_FETCHED_SIZE_THRESHOLD = 100_000;

    /** Message about the long execution of the query. */
    public static final String LONG_QUERY_EXEC_MSG = "Query execution is too long";

    /** */
    public static final String LONG_QUERY_FINISHED_MSG = "Long running query is finished";

    /** */
    public static final String LONG_QUERY_ERROR_MSG = "Long running query is finished with error: ";

    /** */
    public static final String BIG_RESULT_SET_MSG = "Query produced big result set.";

    /** Queries collection. Sorted collection isn't used to reduce 'put' time. */
    private final ConcurrentHashMap<TrackableQuery, TimeoutChecker> qrys = new ConcurrentHashMap<>();

    /** Check worker. */
    private final GridWorker checkWorker;

    /** Logger. */
    private final IgniteLogger log;

    /** Long query timeout milliseconds. */
    private volatile long timeout;

    /**
     * Long query timeout multiplier. The warning will be printed after:
     * - timeout;
     * - timeout * multiplier;
     * - timeout * multiplier * multiplier;
     * - etc...
     *
     * If the multiplier <= 1, the warning message is printed once.
     */
    private volatile int timeoutMult = 2;

    /** Query result set size threshold. */
    private volatile long rsSizeThreshold = DFLT_FETCHED_SIZE_THRESHOLD;

    /**
     * Result set size threshold multiplier. The warning will be printed after:
     * - size of result set > threshold;
     * - size of result set > threshold * multiplier;
     * - size of result set > threshold * multiplier * multiplier;
     * - etc.
     *
     * If the multiplier <= 1, the warning message is printed once.
     */
    private volatile int rsSizeThresholdMult = 2;

    /**
     * @param ctx Kernal context.
     */
    public HeavyQueriesTracker(GridKernalContext ctx) {
        log = ctx.log(HeavyQueriesTracker.class);

        checkWorker = new GridWorker(ctx.igniteInstanceName(), "long-qry", log) {
            @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
                while (true) {
                    checkLongRunning();

                    U.sleep(CHECK_PERIOD);
                }
            }
        };

        timeout = ctx.config().getSqlConfiguration().getLongQueryWarningTimeout();

        IgniteThread thread = new IgniteThread(checkWorker);

        thread.setDaemon(true);
        thread.start();
    }

    /**
     *
     */
    public void stop() {
        checkWorker.cancel();

        qrys.clear();
    }

    /**
     * @param qryInfo Query info to register.
     */
    public void startTracking(TrackableQuery qryInfo) {
        assert qryInfo != null;

        final long timeout0 = timeout;

        if (timeout0 > 0)
            qrys.put(qryInfo, new TimeoutChecker(timeout0, timeoutMult));
    }

    /**
     * @param qryInfo Query info to remove.
     * @param err Exception if query executed with error.
     */
    public void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) {
        assert qryInfo != null;

        qrys.remove(qryInfo);

        if (qryInfo.time() > timeout) {
            if (err == null)
                LT.warn(log, LONG_QUERY_FINISHED_MSG + qryInfo.queryInfo(null));
            else
                LT.warn(log, LONG_QUERY_ERROR_MSG + err.getMessage() + qryInfo.queryInfo(null));
        }
    }

    /**
     * Creates new result-set checker.
     * @param qryInfo Query info.
     */
    public ResultSetChecker resultSetChecker(TrackableQuery qryInfo) {
        return new ResultSetChecker(log, qryInfo, rsSizeThreshold, rsSizeThresholdMult);
    }

    /**
     *
     */
    private void checkLongRunning() {
        for (Map.Entry<TrackableQuery, TimeoutChecker> e : qrys.entrySet()) {
            TrackableQuery qinfo = e.getKey();

            if (e.getValue().checkTimeout(qinfo.time())) {
                LT.warn(log, LONG_QUERY_EXEC_MSG + qinfo.queryInfo(null));

                if (e.getValue().timeoutMult <= 1)
                    qrys.remove(qinfo);
            }
        }
    }

    /**
     * @return Timeout in milliseconds after which long query warning will be printed.
     */
    public long getTimeout() {
        return timeout;
    }

    /**
     * Sets timeout in milliseconds after which long query warning will be printed.
     *
     * @param timeout Timeout in milliseconds after which long query warning will be printed.
     */
    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    /**
     * @return Long query timeout multiplier.
     */
    public int getTimeoutMultiplier() {
        return timeoutMult;
    }

    /**
     * Sets long query timeout multiplier. The warning will be printed after:
     * - timeout;
     * - timeout * multiplier;
     * - timeout * multiplier * multiplier;
     * - etc...
     * If the multiplier <= 1, the warning message is printed once.
     *
     * @param timeoutMult Long query timeout multiplier.
     */
    public void setTimeoutMultiplier(int timeoutMult) {
        this.timeoutMult = timeoutMult;
    }

    /**
     * @return Threshold result's row count, when count of fetched rows is bigger than the threshold
     *      warning will be printed.
     */
    public long getResultSetSizeThreshold() {
        return rsSizeThreshold;
    }

    /**
     * Sets threshold result's row count, when count of fetched rows is bigger than the threshold
     *      warning will be printed.
     *
     * @param rsSizeThreshold Threshold result's row count, when count of fetched rows is bigger than the threshold
     *      warning will be printed.
     */
    public void setResultSetSizeThreshold(long rsSizeThreshold) {
        this.rsSizeThreshold = rsSizeThreshold;
    }

    /**
     * Gets result set size threshold multiplier. The warning will be printed after:
     *  - size of result set > threshold;
     *  - size of result set > threshold * multiplier;
     *  - size of result set > threshold * multiplier * multiplier;
     *  - etc.
     * If the multiplier <= 1, the warning message is printed once.
     * @return Result set size threshold multiplier.
     */
    public int getResultSetSizeThresholdMultiplier() {
        return rsSizeThresholdMult;
    }

    /**
     * Sets result set size threshold multiplier.
     *
     * @param rsSizeThresholdMult Result set size threshold multiplier
     */
    public void setResultSetSizeThresholdMultiplier(int rsSizeThresholdMult) {
        this.rsSizeThresholdMult = rsSizeThresholdMult <= 1 ? 1 : rsSizeThresholdMult;
    }

    /**
     * Holds timeout settings for the specified query.
     */
    private static class TimeoutChecker {
        /** */
        private long timeout;

        /** */
        private final int timeoutMult;

        /**
         * @param timeout Initial timeout.
         * @param timeoutMult Timeout multiplier.
         */
        public TimeoutChecker(long timeout, int timeoutMult) {
            this.timeout = timeout;
            this.timeoutMult = timeoutMult;
        }

        /**
         * @return {@code true} if timeout occurred.
         */
        public boolean checkTimeout(long time) {
            if (time > timeout) {
                if (timeoutMult > 1)
                    timeout *= timeoutMult;

                return true;
            }
            else
                return false;
        }
    }

    /**
     * Holds result-set limit settings for the specified query.
     */
    public static class ResultSetChecker {
        /** Logger. */
        private final IgniteLogger log;

        /** Query info. */
        private final TrackableQuery qryInfo;

        /** Result set size threshold. */
        private long threshold;

        /** Result set size threshold multiplier. */
        private final int thresholdMult;

        /** Fetched count of rows. */
        private long fetchedSize;

        /** Big results flag. */
        private boolean bigResults;

        /** Ctor. */
        private ResultSetChecker(IgniteLogger log, TrackableQuery qryInfo, long threshold, int thresholdMult) {
            this.log = log;
            this.qryInfo = qryInfo;
            this.threshold = threshold;
            this.thresholdMult = thresholdMult;
        }

        /**
         * Print warning message to log when query result size fetch count is bigger than specified threshold.
         * Threshold may be recalculated with multiplier.
         */
        public void checkOnFetchNext() {
            ++fetchedSize;

            if (threshold > 0 && fetchedSize >= threshold) {
                LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched=" + fetchedSize));

                if (thresholdMult > 1)
                    threshold *= thresholdMult;
                else
                    threshold = 0;

                bigResults = true;
            }
        }

        /** */
        public void checkOnClose() {
            if (bigResults)
                LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched=" + fetchedSize));
        }

        /** */
        public long fetchedSize() {
            return fetchedSize;
        }
    }
}
